Source code for hysop.operator.base.poisson_curl

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from abc import abstractmethod
from hysop.constants import FieldProjection
from hysop.tools.numpywrappers import npw
from hysop.tools.htypes import check_instance, first_not_None, to_tuple
from hysop.tools.decorators import debug
from hysop.tools.numerics import float_to_complex_dtype
from hysop.tools.io_utils import IOParams
from hysop.fields.continuous_field import Field
from hysop.parameters.scalar_parameter import ScalarParameter
from hysop.parameters.buffer_parameter import BufferParameter
from hysop.operator.base.spectral_operator import (
    SpectralOperatorBase,
    EnergyPlotter,
    EnergyDumper,
)
from hysop.topology.cartesian_descriptor import CartesianTopologyDescriptors
from hysop.fields.continuous_field import Field
from hysop.symbolic.field import laplacian, grad
from hysop.symbolic.relational import Assignment
from hysop.core.memory.memory_request import MemoryRequest


[docs] class PoissonCurlOperatorBase: """ Solves the poisson-rotational equation using a specific implementation. """ @debug def __new__( cls, vorticity, velocity, variables, diffusion=None, dt=None, projection=None, dump_energy=None, dump_velocity_energy=None, dump_input_vorticity_energy=None, dump_output_vorticity_energy=None, plot_energy=None, plot_velocity_energy=None, plot_input_vorticity_energy=None, plot_output_vorticity_energy=None, plot_inout_vorticity_energy=None, **kwds, ): return super().__new__( cls, input_fields=None, output_fields=None, input_params=None, output_params=None, **kwds, ) @debug def __init__( self, vorticity, velocity, variables, diffusion=None, dt=None, projection=None, dump_energy=None, dump_velocity_energy=None, dump_input_vorticity_energy=None, dump_output_vorticity_energy=None, plot_energy=None, plot_velocity_energy=None, plot_input_vorticity_energy=None, plot_output_vorticity_energy=None, plot_inout_vorticity_energy=None, **kwds, ): """ PoissonCurl operator to solve incompressible flows using various fft backends. Parameters ---------- velocity : :class:`~hysop.fields.continuous_field.Field Output solution velocity field. vorticity: :class:`~hysop.fields.continuous_field.Field` Input vorticity to be diffused, projected. If diffused and/or projected, vorticity is also an output. variables: dict Dictionary of Fields as keys and topologies as values. diffusion: ScalarParameter, optional, defaults to None. Diffuse the vorticity field before applying projection and poisson velocity. dt: ScalarParameter, optional, defaults to None Timestep is only required for diffusion. If diffusion is not enabled, this parameter is ignored. projection: hysop.constants.FieldProjection or positive integer, optional Project vorticity such that resolved velocity is divergence free (for 3D fields). When active, projection is done prior to every solve, unless projection is an integer in which case it is done every given steps. This parameter is ignored for 2D fields and defaults to no projection. dump_energy: IOParams, optional, defaults to None Will set the default io parameter for all energy plotters. dump_velocity_energy: IOParams, optional, defaults to None Dump velocity field energy to a custom file. Defaults to no dump. dump_input_vorticity_energy: IOParams, optional, defaults to None Dump input vorticity field energy to a custom file. Defaults to no dump. dump_output_vorticity_energy: IOParams, optional, defaults to None Dump output vorticity field energy to a custom file. Defaults to no dump. plot_energy: IOParams, optional, defaults to None Will set the default io parameter for all energy plotters. plot_velocity_energy: IOParams, optional, defaults to None Plot velocity field energy and save the plot to a custom file. Defaults to no plot. plot_input_vorticity_energy: IOParams, optional, defaults to None Plot input vorticity field energy and save the plot to a custom file. Defaults to no plot. plot_output_vorticity_energy: IOParams, optional, defaults to None Plot output vorticity field energy and save the plot to a custom file. Defaults to no plot. plot_inout_vorticity_energy: IOParams, optional, defaults to None Plot vorticity field energy before and after diffusion and projection on the same graph. kwds : Base class parameters. Notes: ------ All dump energy arguments also enables scalar energy dumping. This is not true for energy plotting. Passing an integer instead of a IOParams will disable dump and plot arguments. """ check_instance(velocity, Field) check_instance(vorticity, Field) check_instance(variables, dict, keys=Field, values=CartesianTopologyDescriptors) check_instance(diffusion, ScalarParameter, allow_none=True) check_instance(dt, ScalarParameter, allow_none=True) check_instance(projection, (FieldProjection, int), allow_none=True) check_instance(dump_energy, (IOParams, int), allow_none=True) check_instance(dump_velocity_energy, (IOParams, int), allow_none=True) check_instance(dump_input_vorticity_energy, (IOParams, int), allow_none=True) check_instance(dump_output_vorticity_energy, (IOParams, int), allow_none=True) check_instance(plot_energy, (IOParams, int), allow_none=True) check_instance(plot_velocity_energy, (IOParams, int), allow_none=True) check_instance(plot_input_vorticity_energy, (IOParams, int), allow_none=True) check_instance(plot_output_vorticity_energy, (IOParams, int), allow_none=True) check_instance(plot_inout_vorticity_energy, (IOParams, int), allow_none=True) assert velocity.domain is vorticity.domain, "only one domain is supported" assert ( variables[velocity] is variables[vorticity] ), "only one topology is supported" # check for diffusion should_diffuse = diffusion is not None if should_diffuse: if dt is None: msg = "Diffusion has been specified but no timestep was given." raise RuntimeError(msg) else: dt = None # check for projection if ( (projection == FieldProjection.NONE) or (projection is None) or (projection == 0) or (velocity.dim != 3) ): projection = FieldProjection.NONE should_project = projection is not FieldProjection.NONE if projection == FieldProjection.NONE: def do_project(simu): return False elif projection == FieldProjection.EVERY_STEP: def do_project(simu): return True else: # projection is an integer representing frequency freq = projection if freq >= 1: def do_project(simu): return (simu.current_iteration % freq) == 0 else: msg = f"Got negative projection frequency {freq}." raise ValueError(msg) # check fields dim = velocity.dim wcomp = vorticity.nb_components if (dim == 2) and (wcomp != 1): msg = ( f"Vorticity component mistmach, got {wcomp} components but expected 1." ) raise RuntimeError(msg) if (dim == 3) and (wcomp != 3): msg = ( f"Vorticity component mistmach, got {wcomp} components but expected 3." ) raise RuntimeError(msg) if (dim != 3) and (projection != FieldProjection.NONE): raise RuntimeError("Velocity projection only available in 3D.") if velocity.dtype != vorticity.dtype: raise RuntimeError("Datatype mismatch between velocity and vorticity.") # input and output fields vtopology = variables[velocity] wtopology = variables[vorticity] input_params = set() input_fields = {vorticity: wtopology} output_fields = {velocity: vtopology} if should_diffuse: assert dt is not None, "Diffusion timestep has not been given." input_params.update({diffusion, dt}) if should_diffuse or should_project: output_fields[vorticity] = wtopology dump_Uout_E = first_not_None(dump_velocity_energy, dump_energy) dump_Win_E = first_not_None(dump_input_vorticity_energy, dump_energy) dump_Wout_E = first_not_None(dump_output_vorticity_energy, dump_energy) plot_Uout_E = first_not_None(plot_velocity_energy, plot_energy) plot_Win_E = first_not_None(plot_input_vorticity_energy, plot_energy) plot_Wout_E = first_not_None(plot_output_vorticity_energy, plot_energy) plot_Winout_E = first_not_None(plot_inout_vorticity_energy, plot_energy) do_dump_Uout_E = isinstance(dump_Uout_E, IOParams) and ( dump_Uout_E.frequency >= 0 ) do_dump_Win_E = isinstance(dump_Win_E, IOParams) and (dump_Win_E.frequency >= 0) do_dump_Wout_E = isinstance(dump_Wout_E, IOParams) and ( dump_Wout_E.frequency >= 0 ) do_plot_Uout_E = isinstance(plot_Uout_E, IOParams) and ( plot_Uout_E.frequency >= 0 ) do_plot_Win_E = isinstance(plot_Win_E, IOParams) and (plot_Win_E.frequency >= 0) do_plot_Wout_E = isinstance(plot_Wout_E, IOParams) and ( plot_Wout_E.frequency >= 0 ) do_plot_Winout_E = isinstance(plot_Winout_E, IOParams) and ( plot_Winout_E.frequency >= 0 ) do_compute_Uout_E, compute_Uout_E_freqs = EnergyDumper.do_compute_energy( dump_Uout_E, plot_Uout_E ) do_compute_Win_E, compute_Win_E_freqs = EnergyDumper.do_compute_energy( dump_Win_E, plot_Win_E, plot_Winout_E ) do_compute_Wout_E, compute_Wout_E_freqs = EnergyDumper.do_compute_energy( dump_Wout_E, plot_Wout_E, plot_Winout_E ) if do_compute_Wout_E: msg = "Cannot compute output vorticity energy because there is no output vorticity !" assert should_diffuse or should_project, msg output_params = set() compute_Win_E_param = EnergyDumper.build_energy_parameter( do_compute=do_compute_Win_E, field=vorticity, output_params=output_params, prefix="in", ) compute_Uout_E_param = EnergyDumper.build_energy_parameter( do_compute=do_compute_Uout_E, field=velocity, output_params=output_params, prefix="out", ) compute_Wout_E_param = EnergyDumper.build_energy_parameter( do_compute=do_compute_Wout_E, field=vorticity, output_params=output_params, prefix="out", ) super().__init__( input_fields=input_fields, output_fields=output_fields, input_params=input_params, output_params=output_params, **kwds, ) self.W = vorticity self.U = velocity self.dim = dim self.should_diffuse = should_diffuse self.nu = diffusion self.dt = dt self.should_project = should_project self.projection = projection self.do_project = do_project self.do_compute_energy = { "Uout": do_compute_Uout_E, "Win": do_compute_Win_E, "Wout": do_compute_Wout_E, } self.do_dump_energy = { "Uout": do_dump_Uout_E, "Win": do_dump_Win_E, "Wout": do_dump_Wout_E, } self.do_plot_energy = { "Uout": do_plot_Uout_E, "Win": do_plot_Win_E, "Wout": do_plot_Wout_E, "Winout": do_plot_Winout_E, } self.dump_energy_ioparams = { "Uout": dump_Uout_E, "Win": dump_Win_E, "Wout": dump_Wout_E, } self.plot_energy_ioparams = { "Uout": plot_Uout_E, "Win": plot_Win_E, "Wout": plot_Wout_E, "Winout": plot_Winout_E, } self.compute_energy_parameters = { "Uout": compute_Uout_E_param, "Win": compute_Win_E_param, "Wout": compute_Wout_E_param, } self.compute_energy_frequencies = { "Uout": compute_Uout_E_freqs, "Win": compute_Win_E_freqs, "Wout": compute_Wout_E_freqs, }
[docs] @debug def discretize(self): if self.discretized: return super().discretize() self.dW = self.get_input_discrete_field(self.W) self.dU = self.get_output_discrete_field(self.U)
[docs] class SpectralPoissonCurlOperatorBase(PoissonCurlOperatorBase, SpectralOperatorBase): @debug def __new__( cls, vorticity, velocity, variables, diffusion=None, dt=None, projection=None, **kwds, ): return super().__new__( cls, vorticity=vorticity, velocity=velocity, variables=variables, diffusion=diffusion, dt=dt, projection=projection, **kwds, ) @debug def __init__( self, vorticity, velocity, variables, diffusion=None, dt=None, projection=None, **kwds, ): super().__init__( vorticity=vorticity, velocity=velocity, variables=variables, diffusion=diffusion, dt=dt, projection=projection, **kwds, ) dim = self.dim should_diffuse, should_project = self.should_diffuse, self.should_project de_iops = self.dump_energy_ioparams ce_freqs = self.compute_energy_frequencies # build spectral transforms tg = self.new_transform_group() W_forward_transforms = to_tuple( tg.require_forward_transform( vorticity, compute_energy_frequencies=ce_freqs["Win"], dump_energy=de_iops["Win"], ) ) U_backward_transforms = to_tuple( tg.require_backward_transform( velocity, custom_input_buffer="B0", compute_energy_frequencies=ce_freqs["Uout"], dump_energy=de_iops["Uout"], ) ) if should_diffuse or should_project: W_backward_transforms = to_tuple( tg.require_backward_transform( vorticity, compute_energy_frequencies=ce_freqs["Wout"], dump_energy=de_iops["Wout"], ) ) else: W_backward_transforms = (None,) * dim W_Fts = npw.asarray([x.s for x in W_forward_transforms]) U_Bts = npw.asarray([x.s for x in U_backward_transforms]) W_Bts = npw.asarray( [None if (x is None) else x.s for x in W_backward_transforms] ) # generate wavenumbers kd1s = () for Wi in W_Fts: expr1 = grad(Wi, Wi.frame) kd1 = tuple( sorted(tg.push_expressions(*to_tuple(expr1)), key=lambda k: k.axis) ) kd1s += (kd1,) # laplacian kd2s = () for Wi in W_Fts: expr2 = laplacian(Wi, Wi.frame) kd2 = tuple( sorted(tg.push_expressions(*to_tuple(expr2)), key=lambda k: k.axis) ) kd2s += (kd2,) # curl if dim == 2: (W2,) = W_forward_transforms U0, U1 = U_backward_transforms exprs = ( Assignment(U0.s, +W2.s.diff(W2.s.frame.coords[1])), Assignment(U1.s, -W2.s.diff(W2.s.frame.coords[0])), ) Uin = (W2, W2) Uout = (U0, U1) Uk = tuple(tg.push_expressions(e)[0] for e in exprs) elif dim == 3: W0, W1, W2 = W_forward_transforms U0, U1, U2 = U_backward_transforms exprs = ( Assignment(U0.s, +W2.s.diff(W2.s.frame.coords[1])), Assignment(U0.s, -W1.s.diff(W1.s.frame.coords[2])), Assignment(U1.s, +W0.s.diff(W0.s.frame.coords[2])), Assignment(U1.s, -W2.s.diff(W2.s.frame.coords[0])), Assignment(U2.s, +W1.s.diff(W1.s.frame.coords[0])), Assignment(U2.s, -W0.s.diff(W0.s.frame.coords[1])), ) Uin = (W2, W1, W0, W2, W1, W0) Uout = (U0, U0, U1, U1, U2, U2) Uk = tuple(tg.push_expressions(e)[0] for e in exprs) else: raise NotImplementedError self.tg = tg self.W_forward_transforms = W_forward_transforms self.U_backward_transforms = U_backward_transforms self.W_backward_transforms = W_backward_transforms self.W_Fts = W_Fts self.U_Bts = U_Bts self.W_Bts = W_Bts self.kd1s = kd1s self.kd2s = kd2s self.Uin = Uin self.Uout = Uout self.Uk = Uk
[docs] @debug def discretize(self): if self.discretized: return super().discretize() fig_titles = { "Win": "Energy of input vorticity {}", "Uout": "Energy of output velocity {}", "Wout": "Energy of output vorticity {}", } get_transforms = { "Win": self.W_forward_transforms, "Wout": self.W_backward_transforms, "Uout": self.U_backward_transforms, } get_dfield = {"Win": self.dW, "Wout": self.dW, "Uout": self.dU} compute_fns = () for k, v in self.do_compute_energy.items(): if not v: assert not self.do_dump_energy[k] assert not self.do_plot_energy[k] continue dfield = get_dfield[k] for tr in get_transforms[k]: if tr._energy_parameter is None: msg = "Energy parameter of {}.{} has not been build." raise RuntimeError( msg.format( tr.field.name, "forward" if tr.is_forward else "backward" ) ) E_params = tuple(tr._energy_parameter for tr in get_transforms[k]) E_buffers = tuple(p.value for p in E_params) E_size = max(p.size for p in E_params) Ep = self.compute_energy_parameters[k] Ep.reallocate_buffer(shape=(E_size,), dtype=self.dU.dtype) if self.do_dump_energy[k]: iop = self.dump_energy_ioparams[k] assert iop is not None dmp = EnergyDumper(energy_parameter=Ep, io_params=iop, fname=k) else: dmp = None if self.do_plot_energy[k]: iop = self.plot_energy_ioparams[k] assert iop is not None pname = f"{self.__class__.__name__}.{dfield.pretty_name}" energy_parameters = {pname: Ep} plt = EnergyPlotter( energy_parameters=energy_parameters, io_params=iop, fname=k, fig_title=fig_titles[k].format(dfield.pretty_name), ) else: plt = None freqs = self.compute_energy_frequencies[k] iops = tuple( self.io_params.clone(frequency=f, with_last=True) for f in freqs ) def compute_fn( simulation, plt=plt, dmp=dmp, dst=Ep._value, srcs=E_buffers, iops=iops ): should_compute = any( iop.sould_dump(simulation=simulation) for iop in iops ) if should_compute: dst[...] = 0.0 for src in srcs: dst[: src.size] += src if dmp is not None: dmp.update(simulation=simulation, wait_for=None) if plt is not None: plt.update(simulation=simulation, wait_for=None) compute_fns += (compute_fn,) assert "Winout" not in self.do_compute_energy assert "Winout" not in self.do_dump_energy if self.do_plot_energy["Winout"]: iop = self.plot_energy_ioparams["Winout"] assert iop is not None pname_in = f"{self.__class__.__name__}.input.{self.dW.pretty_name}" pname_out = f"{self.__class__.__name__}.output.{self.dW.pretty_name}" energy_parameters = { pname_in: self.compute_energy_parameters["Win"], pname_out: self.compute_energy_parameters["Wout"], } plt = EnergyPlotter( energy_parameters=energy_parameters, io_params=iop, fname="Winout", fig_title=f"Input and output vorticity energy (diffusion={self.nu():.2e}, project={self.projection})", ) def compute_fn_Winout(simulation, plt=plt): plt.update(simulation=simulation, wait_for=None) compute_fns += (compute_fn_Winout,) def update_energy(simulation, compute_fns=compute_fns): for fn in compute_fns: fn(simulation=simulation) self.update_energy = update_energy kd1s, kd2s = self.kd1s, self.kd2s if self.should_project: dkd1s = () for kd1 in kd1s: dkd1 = [ None, ] * len(kd1) for wi in kd1: idx, dkd, nd_dkd = self.tg.discrete_wave_numbers[wi] dkd1[idx] = dkd dkd1s += (tuple(dkd1),) else: dkd1s = None dkd2s = () for kd2 in kd2s: dkd2 = [ None, ] * len(kd2) for wi in kd2: idx, dkd, nd_dkd = self.tg.discrete_wave_numbers[wi] dkd2[idx] = dkd dkd2s += (tuple(dkd2),) dUk = () for ki in self.Uk: _, dki, _ = self.tg.discrete_wave_numbers[ki] dUk += (dki,) self.dkd1s = dkd1s self.dkd2s = dkd2s self.dUk = dUk
[docs] def get_work_properties(self): requests = super().get_work_properties() Ut = self.U_backward_transforms dtypes = tuple(tr.input_dtype for tr in Ut) shapes = tuple(tr.input_shape for tr in Ut) assert all(d == dtypes[0] for d in dtypes) # we request one buffer per vorticity component (1 in 2D, 3 in 3D) for i, (Ft, Bt) in enumerate( zip(self.W_forward_transforms, self.W_backward_transforms) ): assert Ft.output_dtype == dtypes[0] if Bt is not None: assert Ft.backend is Bt.backend assert Ft.output_dtype == Bt.input_dtype, ( Ft.output_dtype, Bt.input_dtype, ) assert Ft.output_shape == Bt.input_shape, ( Ft.output_shape, Bt.input_shape, ) shape = Ft.output_shape dtype = Ft.output_dtype request = MemoryRequest( backend=self.tg.backend, dtype=dtype, shape=shape, nb_components=1, alignment=self.min_fft_alignment, ) requests.push_mem_request(f"fft_buffer_{i}", request) return requests
[docs] def setup(self, work): dim = self.dim Ks, KKs = self.dkd1s, self.dkd2s W_forward_transforms = self.W_forward_transforms W_backward_transforms = self.W_backward_transforms U_backward_transforms = self.U_backward_transforms for i, (W_Ft, W_Bt) in enumerate( zip(W_forward_transforms, W_backward_transforms) ): (dtmp,) = work.get_buffer(self, f"fft_buffer_{i}") W_Ft.configure_output_buffer(dtmp) if W_Bt is not None: W_Bt.configure_input_buffer(dtmp) output_axes = W_Ft.output_axes super().setup(work) # extract buffers reorder_fields = tuple(dim - 1 - i for i in output_axes) if dim == 2: K = Ks KK = KKs WIN = (W_forward_transforms[0].full_output_buffer,) elif dim == 3: K = tuple(Ks[i] for i in reorder_fields) if Ks else None KK = tuple(KKs[i] for i in reorder_fields) WIN = tuple( W_forward_transforms[i].full_output_buffer for i in reorder_fields ) else: raise NotImplementedError self.WIN = WIN self.K = K self.KK = KK UIN, UOUT, UK = (), (), () assert len(self.Uin) == len(self.Uout) == len(self.dUk) for i, (Uin, Uout, Uk) in enumerate(zip(self.Uin, self.Uout, self.dUk)): Uin = Uin.full_output_buffer Uout = Uout.full_input_buffer UIN += (Uin,) UOUT += (Uout,) UK += (Uk,) self.UIN = UIN self.UOUT = UOUT self.UK = UK